Skip to content

[SPARK-57760][STREAMING] Make small optimizations to StatefulProcessorApiClient#56786

Open
funrollloops wants to merge 3 commits into
apache:masterfrom
funrollloops:tws-opt-1
Open

[SPARK-57760][STREAMING] Make small optimizations to StatefulProcessorApiClient#56786
funrollloops wants to merge 3 commits into
apache:masterfrom
funrollloops:tws-opt-1

Conversation

@funrollloops

@funrollloops funrollloops commented Jun 25, 2026

Copy link
Copy Markdown

What changes were proposed in this pull request?

Make two small optimizations to StatefulProcessorApiClient:

  1. Call PickleSerializer() instead of using the default CPickleSerializer (which is CloudPickleSerializer). We don't need the latter since this path does not deal with code objects.
  2. Micro-optimize state value normalization: add a fast-path for primitives, prefer map to generator comprehensions, and move the numpy import and function definition to the top level so it is done once

Benchmarks

This is a microbenchmark for _serialize_to_bytes:

Before
--- single-field tuples ---
  python int     → LongType                         p50=  4.11µs  p95=  4.21µs  p99=  6.49µs
  python float   → DoubleType                       p50=  4.10µs  p95=  4.20µs  p99=  4.96µs
  np.float64     → DoubleType                       p50=  4.42µs  p95=  4.57µs  p99=  6.46µs
  np.datetime64  → Timestamp                        p50=  7.32µs  p95=  7.67µs  p99= 11.60µs
  pd.Timestamp   → Timestamp                        p50=  7.53µs  p95=  7.76µs  p99= 12.34µs

--- wider tuples ---
  10× python float → 10× DoubleType                 p50=  7.80µs  p95=  7.94µs  p99= 12.37µs
  10× np.float64   → 10× DoubleType                 p50=  9.01µs  p95=  9.23µs  p99= 14.88µs
  mixed (np.f64, np.i64, str, pd.Ts)                p50=  9.93µs  p95= 10.26µs  p99= 17.66µs


After
--- single-field tuples ---
  python int     → LongType                         p50=  1.17µs  p95=  1.19µs  p99=  1.22µs
  python float   → DoubleType                       p50=  1.18µs  p95=  1.19µs  p99=  1.22µs
  np.float64     → DoubleType                       p50=  1.92µs  p95=  1.98µs  p99=  2.01µs
  np.datetime64  → Timestamp                        p50=  4.59µs  p95=  4.71µs  p99=  4.78µs
  pd.Timestamp   → Timestamp                        p50=  5.07µs  p95=  5.17µs  p99=  5.24µs

--- wider tuples ---
  10× python float → 10× DoubleType                 p50=  2.19µs  p95=  2.23µs  p99=  2.26µs
  10× np.float64   → 10× DoubleType                 p50=  7.72µs  p95=  7.82µs  p99=  7.89µs
  mixed (np.f64, np.i64, str, pd.Ts)                p50=  7.34µs  p95=  7.46µs  p99=  7.58µs

Why are the changes needed?

Together these changes improve transform with state on a simple rolling-window style benchmark by ~10%.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing unit tests.

Was this patch authored or co-authored using generative AI tooling?

No, but Claude was consulted in the process of producing this PR.

@funrollloops funrollloops changed the title Make small optimizations to StatefulProcessorApiClient [SPARK-?????][STREAMING] Make small optimizations to StatefulProcessorApiClient Jun 25, 2026
@funrollloops funrollloops changed the title [SPARK-?????][STREAMING] Make small optimizations to StatefulProcessorApiClient [WIP][SPARK-?????][STREAMING] Make small optimizations to StatefulProcessorApiClient Jun 25, 2026

@HyukjinKwon HyukjinKwon left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 blocking, 0 non-blocking, 1 nit.
The closure-hoisting optimization is good; the serializer swap and the fast-path subclass leak read as unintended behavior changes for a "small optimizations" PR.

Correctness (2)

  • stateful_processor_api_client.py:110: PickleSerializer() drops cloudpickle — CPickleSerializer defaults to CloudPickleSerializer — see inline
  • stateful_processor_api_client.py:45: scalar fast-path returns np.float64/pd.Timestamp unconverted (they subclass float/datetime) — see inline

Nits: 1 minor item (see inline comments).

Verification

Confirmed empirically: issubclass(np.float64, float) and issubclass(pandas.Timestamp, datetime) are both True, while np.int64/np.bool_ are not subclasses of int/bool. So the new fast-path leaks exactly np.float64 and pd.Timestamp (returned as-is, skipping .tolist()/.to_pydatetime()), while np.int64/np.bool_ still correctly reach the np.generic branch.

PR description suggestions

  • Document: why the serializer is changed from CPickleSerializer to PickleSerializer (this is a capability change, not just an optimization) — and whether dropping cloudpickle is intended.
  • Add: a real JIRA id (currently SPARK-?????) and a short note on what is being optimized and how it was measured.

Comment thread python/pyspark/sql/streaming/stateful_processor_api_client.py
Comment thread python/pyspark/sql/streaming/stateful_processor_api_client.py Outdated
Comment thread python/pyspark/sql/streaming/stateful_processor_api_client.py Outdated
Benchmarked a tuple vs a frozenset, and the tuple came out noticeably
faster.
@funrollloops

Copy link
Copy Markdown
Author

Hi @HyukjinKwon, I'm waiting on my Jira account to be created. I'll create a Jira ticket for this PR once I am able.

Attached is the script I used to benchmark the _serialize_to_bytes function.

bm_serialize.py

@funrollloops funrollloops changed the title [WIP][SPARK-?????][STREAMING] Make small optimizations to StatefulProcessorApiClient [WIP][SPARK-57760][STREAMING] Make small optimizations to StatefulProcessorApiClient Jun 29, 2026
@funrollloops funrollloops changed the title [WIP][SPARK-57760][STREAMING] Make small optimizations to StatefulProcessorApiClient [SPARK-57760][STREAMING] Make small optimizations to StatefulProcessorApiClient Jun 29, 2026
@funrollloops funrollloops marked this pull request as ready for review June 29, 2026 23:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants